欢迎来到cool的博客
7

Music box

Click to Start

点击头像播放音乐
新博客链接

READING IN MULTIPLE FILES WITH GOOGLE CLOUD DATAFLOWd(从gcs读取多个文件转化成PCOLLECTION)

Whenever you want to read in files from multiple folders from Google Cloud Storage in a Dataflow pipeline (as I was facing myself this week), you might run into a little trouble (as described in this Stackoverflow question). In this tip I will describe how I solved the problem before and provide some code you can use when you have a comma-seperated list of files you want to process.

Steps

What you can do in this case is read in the files in the usual way using TextIO and a PCollection, then constructing a PCollectionList, consisting of these PCollections and then flattening the whole thing into one PCollection.

Code

When you have a comma-separated list as input, with all the paths you want to read in (which can be easily passed as a command line argument) you should be able to use the following Java code:

String input = "gs://xxx/*,gs://yyy/zzz/*" // (or options.getInput())
ArrayList<PCollection<String>> pcollectionlist = new ArrayList<>();

String[] input = inputs.split(",");
for(String i : input) {
	PCollection<String> extra = p.apply(TextIO.read().from(i));
	pcollectionlist.add(extra);
}
		
PCollectionList<String> tempRes = PCollectionList.of(pcollectionlist); 
PCollection<String> res = tempRes.apply(Flatten.pCollections());

Result

And this is what your input graph could look like for 7 input files for example:

Dataflow graph

Find out more

Learn about:

 

返回列表